// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package nomad

import (
	"context"
	"errors"
	"fmt"
	"net/http"
	"strings"
	"time"

	"github.com/dustin/go-humanize"
	"github.com/hashicorp/go-hclog"
	"github.com/hashicorp/go-memdb"
	metrics "github.com/hashicorp/go-metrics/compat"
	"github.com/hashicorp/go-multierror"

	"github.com/hashicorp/nomad/acl"
	cstructs "github.com/hashicorp/nomad/client/structs"
	"github.com/hashicorp/nomad/nomad/state"
	"github.com/hashicorp/nomad/nomad/state/paginator"
	"github.com/hashicorp/nomad/nomad/structs"
	"github.com/hashicorp/nomad/plugins/csi"
)

// CSIVolume wraps the structs.CSIVolume with request data and server context
type CSIVolume struct {
	srv    *Server
	ctx    *RPCContext
	logger hclog.Logger
}

func NewCSIVolumeEndpoint(srv *Server, ctx *RPCContext) *CSIVolume {
	return &CSIVolume{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_volume")}
}

const (
	csiVolumeTable = "csi_volumes"
	csiPluginTable = "csi_plugins"
)

// replySetIndex sets the reply with the last index that modified the table
func (s *Server) replySetIndex(table string, reply *structs.QueryMeta) error {
	fmsState := s.fsm.State()

	index, err := fmsState.Index(table)
	if err != nil {
		return err
	}
	reply.Index = index

	// Set the query response
	s.setQueryMeta(reply)
	return nil
}

// List replies with CSIVolumes, filtered by ACL access
func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIVolumeListResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.List", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricList, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
		acl.NamespaceCapabilityCSIReadVolume,
		acl.NamespaceCapabilityCSIMountVolume,
		acl.NamespaceCapabilityListJobs)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	if !allowVolume(aclObj, args.RequestNamespace()) {
		return structs.ErrPermissionDenied
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "list"}, time.Now())

	ns := args.RequestNamespace()
	opts := blockingOptions{
		queryOpts: &args.QueryOptions,
		queryMeta: &reply.QueryMeta,
		run: func(ws memdb.WatchSet, state *state.StateStore) error {
			snap, err := state.Snapshot()
			if err != nil {
				return err
			}

			// Query all volumes
			var iter memdb.ResultIterator

			prefix := args.Prefix

			if args.NodeID != "" {
				iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID)
			} else if args.PluginID != "" {
				iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID)
			} else if prefix != "" {
				iter, err = snap.CSIVolumesByIDPrefix(ws, ns, prefix)
			} else if ns != structs.AllNamespacesSentinel {
				iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix)
			} else {
				iter, err = snap.CSIVolumes(ws)
			}
			if err != nil {
				return err
			}

			selector := func(vol *structs.CSIVolume) bool {
				// Remove (possibly again) by PluginID to handle passing both
				// NodeID and PluginID
				if args.PluginID != "" && args.PluginID != vol.PluginID {
					return false
				}

				// Remove by Namespace, since CSIVolumesByNodeID hasn't used
				// the Namespace yet
				if ns != structs.AllNamespacesSentinel && vol.Namespace != ns {
					return false
				}

				return true
			}

			pager, err := paginator.NewPaginator(iter, args.QueryOptions,
				selector,
				paginator.NamespaceIDTokenizer[*structs.CSIVolume](args.NextToken),
				func(vol *structs.CSIVolume) (*structs.CSIVolListStub, error) {
					vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy())
					if err != nil {
						return nil, err
					}
					return vol.Stub(), nil
				})
			if err != nil {
				return structs.NewErrRPCCodedf(
					http.StatusBadRequest, "failed to create result paginator: %v", err)
			}

			vs, nextToken, err := pager.Page()
			if err != nil {
				return structs.NewErrRPCCodedf(
					http.StatusBadRequest, "failed to read result page: %v", err)
			}

			reply.QueryMeta.NextToken = nextToken
			reply.Volumes = vs
			return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
		}}
	return v.srv.blockingRPC(&opts)
}

// Get fetches detailed information about a specific volume
func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVolumeGetResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Get", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricRead, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIReadVolume,
		acl.NamespaceCapabilityCSIMountVolume,
		acl.NamespaceCapabilityReadJob)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	ns := args.RequestNamespace()
	if !allowCSIAccess(aclObj, ns) {
		return structs.ErrPermissionDenied
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, time.Now())

	if args.ID == "" {
		return fmt.Errorf("missing volume ID")
	}

	opts := blockingOptions{
		queryOpts: &args.QueryOptions,
		queryMeta: &reply.QueryMeta,
		run: func(ws memdb.WatchSet, state *state.StateStore) error {
			snap, err := state.Snapshot()
			if err != nil {
				return err
			}

			vol, err := snap.CSIVolumeByID(ws, ns, args.ID)
			if err != nil {
				return err
			}
			if vol != nil {
				vol, err = snap.CSIVolumeDenormalize(ws, vol)
			}
			if err != nil {
				return err
			}

			reply.Volume = vol
			return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
		}}
	return v.srv.blockingRPC(&opts)
}

func (v *CSIVolume) pluginValidateVolume(vol *structs.CSIVolume) (*structs.CSIPlugin, error) {
	state := v.srv.fsm.State()

	plugin, err := state.CSIPluginByID(nil, vol.PluginID)
	if err != nil {
		return nil, err
	}
	if plugin == nil {
		return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID)
	}

	if plugin.ControllerRequired && plugin.ControllersHealthy < 1 {
		return nil, fmt.Errorf("no healthy controllers for CSI plugin: %s", vol.PluginID)
	}

	vol.Provider = plugin.Provider
	vol.ProviderVersion = plugin.Version

	return plugin, nil
}

func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume, plugin *structs.CSIPlugin) error {

	if !plugin.ControllerRequired {
		// The plugin does not require a controller, so for now we won't do any
		// further validation of the volume.
		return nil
	}

	method := "ClientCSI.ControllerValidateVolume"
	cReq := &cstructs.ClientCSIControllerValidateVolumeRequest{
		VolumeID:           vol.RemoteID(),
		VolumeCapabilities: vol.RequestedCapabilities,
		Secrets:            vol.Secrets,
		Parameters:         vol.Parameters,
		Context:            vol.Context,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerValidateVolumeResponse{}

	return v.srv.RPC(method, cReq, cResp)
}

// Register registers a new volume or updates an existing volume.
//
// Note that most user-defined CSIVolume fields are immutable once
// the volume has been created, but exceptions include min and max
// requested capacity values.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
// won't work either, and the user can deregister the volume and try
// again with the right settings. This lets us be as strict with
// validation here as the CreateVolume CSI RPC is expected to be.
func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "register"}, time.Now())

	// permission for the volume namespaces will be checked below
	if !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	if len(args.Volumes) == 0 {
		return fmt.Errorf("missing volume definition")
	}

	snap, err := v.srv.State().Snapshot()
	if err != nil {
		return err
	}

	// Validate ACLs, that the plugin exists for each volume, and validate the
	// capabilities when the plugin has a controller.
	for _, vol := range args.Volumes {
		if vol.Namespace == "" {
			vol.Namespace = args.RequestNamespace()
		}
		if !allowVolume(aclObj, vol.Namespace) {
			return structs.ErrPermissionDenied
		}
		if err = vol.Validate(); err != nil {
			return err
		}

		existingVol, err := snap.CSIVolumeByID(nil, vol.Namespace, vol.ID)
		if err != nil {
			return err
		}

		plugin, err := v.pluginValidateVolume(vol)
		if err != nil {
			return err
		}

		// CSIVolume has many user-defined fields which are immutable
		// once set, and many fields that are controlled by Nomad and
		// are not user-settable. We merge onto a copy of the existing
		// volume to allow a user to submit a volume spec for `volume
		// create` and reuse it for updates in `volume register`
		// without having to manually remove the fields unused by
		// register (and similar use cases with API consumers such as
		// Terraform).
		if existingVol != nil {
			existingVol = existingVol.Copy()

			// reconcile mutable fields
			if err = v.reconcileVolume(plugin, existingVol, vol); err != nil {
				return fmt.Errorf("unable to update volume: %s", err)
			}

			*vol = *existingVol

		} else if len(vol.Topologies) == 0 {
			// The topologies for the volume have already been set
			// when it was created, so for newly register volumes
			// we accept the user's description of that topology
			if vol.RequestedTopologies != nil {
				vol.Topologies = vol.RequestedTopologies.Required
			}
		}

		if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
			return err
		}
	}

	_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
	if err != nil {
		v.logger.Error("csi raft apply failed", "error", err, "method", "register")
		return err
	}

	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

// reconcileVolume updates a volume with many of the contents of another.
// It may or may not do extra work to actually expand a volume outside of Nomad,
// depending on whether requested capacity values have changed.
func (v *CSIVolume) reconcileVolume(plugin *structs.CSIPlugin, vol *structs.CSIVolume, update *structs.CSIVolume) error {
	// Merge does some validation, before we attempt any potential CSI RPCs,
	// and mutates `vol` with (most of) the values of `update`,
	// notably excluding capacity values, which are covered below.
	err := vol.Merge(update)
	if err != nil {
		return err
	}
	// expandVolume will mutate `vol` with new capacity-related values, if needed.
	return v.expandVolume(vol, plugin, &csi.CapacityRange{
		RequiredBytes: update.RequestedCapacityMin,
		LimitBytes:    update.RequestedCapacityMax,
	})
}

// Deregister removes a set of volumes
func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *structs.CSIVolumeDeregisterResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Deregister", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "deregister"}, time.Now())

	ns := args.RequestNamespace()
	if !allowVolume(aclObj, ns) {
		return structs.ErrPermissionDenied
	}

	if len(args.VolumeIDs) == 0 {
		return fmt.Errorf("missing volume IDs")
	}

	_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
	if err != nil {
		v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
		return err
	}

	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

// Claim submits a change to a volume claim
func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CSIVolumeClaimResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Claim", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "claim"}, time.Now())

	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}
	if !aclObj.AllowClientOp() {
		return structs.ErrPermissionDenied
	}

	if args.VolumeID == "" {
		return fmt.Errorf("missing volume ID")
	}

	isNewClaim := args.Claim != structs.CSIVolumeClaimGC &&
		args.State == structs.CSIVolumeClaimStateTaken
	// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
	// need to ensure it's been populated during upgrades from 0.11.0
	// to later patch versions. Remove this block in 1.0
	if isNewClaim && args.NodeID == "" {
		state := v.srv.fsm.State()
		ws := memdb.NewWatchSet()
		alloc, err := state.AllocByID(ws, args.AllocationID)
		if err != nil {
			return err
		}
		if alloc == nil {
			return fmt.Errorf("%s: %s",
				structs.ErrUnknownAllocationPrefix, args.AllocationID)
		}
		args.NodeID = alloc.NodeID
	}

	_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
	if err != nil {
		v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
		return err
	}

	if isNewClaim {
		// if this is a new claim, add a Volume and PublishContext from the
		// controller (if any) to the reply
		err = v.controllerPublishVolume(args, reply)
		if err != nil {
			return fmt.Errorf("controller publish: %v", err)
		}
	}

	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

func csiVolumeMountOptions(c *structs.CSIMountOptions) *cstructs.CSIVolumeMountOptions {
	if c == nil {
		return nil
	}

	return &cstructs.CSIVolumeMountOptions{
		Filesystem: c.FSType,
		MountFlags: c.MountFlags,
	}
}

// controllerPublishVolume sends publish request to the CSI controller
// plugin associated with a volume, if any.
func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, resp *structs.CSIVolumeClaimResponse) error {
	plug, vol, err := v.volAndPluginLookup(req.RequestNamespace(), req.VolumeID)
	if err != nil {
		return err
	}

	// Set the Response volume from the lookup
	resp.Volume = vol

	// Validate the existence of the allocation, regardless of whether we need it
	// now.
	state := v.srv.fsm.State()
	ws := memdb.NewWatchSet()
	alloc, err := state.AllocByID(ws, req.AllocationID)
	if err != nil {
		return err
	}
	if alloc == nil {
		return fmt.Errorf("%s: %s", structs.ErrUnknownAllocationPrefix, req.AllocationID)
	}

	// Some plugins support controllers for create/snapshot but not attach. So
	// if there's no plugin or the plugin doesn't attach volumes, then we can
	// skip the controller publish workflow and return nil.
	if plug == nil || !plug.HasControllerCapability(structs.CSIControllerSupportsAttachDetach) {
		return nil
	}

	// get Nomad's ID for the client node (not the storage provider's ID)
	targetNode, err := state.NodeByID(ws, alloc.NodeID)
	if err != nil {
		return err
	}
	if targetNode == nil {
		return fmt.Errorf("%w %s", structs.ErrUnknownNode, alloc.NodeID)
	}

	// if the RPC is sent by a client node, it may not know the claim's
	// external node ID.
	if req.ExternalNodeID == "" {
		externalNodeID, err := v.lookupExternalNodeID(vol, req.ToClaim())
		if err != nil {
			return fmt.Errorf("missing external node ID: %v", err)
		}
		req.ExternalNodeID = externalNodeID
	}

	method := "ClientCSI.ControllerAttachVolume"
	cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
		VolumeID:        vol.RemoteID(),
		ClientCSINodeID: req.ExternalNodeID,
		AttachmentMode:  req.AttachmentMode,
		AccessMode:      req.AccessMode,
		MountOptions:    csiVolumeMountOptions(vol.MountOptions),
		ReadOnly:        req.Claim == structs.CSIVolumeClaimRead,
		Secrets:         vol.Secrets,
		VolumeContext:   vol.Context,
	}
	cReq.PluginID = plug.ID
	cResp := &cstructs.ClientCSIControllerAttachVolumeResponse{}

	err = v.serializedControllerRPC(plug.ID, func() error {
		return v.srv.RPC(method, cReq, cResp)
	})
	if err != nil {
		if strings.Contains(err.Error(), "FailedPrecondition") {
			return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err)
		}
		return err
	}
	resp.PublishContext = cResp.PublishContext
	return nil
}

func (v *CSIVolume) volAndPluginLookup(namespace, volID string) (*structs.CSIPlugin, *structs.CSIVolume, error) {
	state := v.srv.fsm.State()
	vol, err := state.CSIVolumeByID(nil, namespace, volID)
	if err != nil {
		return nil, nil, err
	}
	if vol == nil {
		return nil, nil, fmt.Errorf("volume not found: %s", volID)
	}
	if !vol.ControllerRequired {
		return nil, vol, nil
	}

	// note: we do this same lookup in CSIVolumeByID but then throw
	// away the pointer to the plugin rather than attaching it to
	// the volume so we have to do it again here.
	plug, err := state.CSIPluginByID(nil, vol.PluginID)
	if err != nil {
		return nil, nil, err
	}
	if plug == nil {
		return nil, nil, fmt.Errorf("plugin not found: %s", vol.PluginID)
	}
	return plug, vol, nil
}

// serializedControllerRPC ensures we're only sending a single controller RPC to
// a given plugin if the RPC can cause conflicting state changes.
//
// The CSI specification says that we SHOULD send no more than one in-flight
// request per *volume* at a time, with an allowance for losing state
// (ex. leadership transitions) which the plugins SHOULD handle gracefully.
//
// In practice many CSI plugins rely on k8s-specific sidecars for serializing
// storage provider API calls globally (ex. concurrently attaching EBS volumes
// to an EC2 instance results in a race for device names). So we have to be much
// more conservative about concurrency in Nomad than the spec allows.
func (v *CSIVolume) serializedControllerRPC(pluginID string, fn func() error) error {

	for {
		v.srv.volumeControllerLock.Lock()
		future := v.srv.volumeControllerFutures[pluginID]
		if future == nil {
			future, futureDone := context.WithCancel(v.srv.shutdownCtx)
			v.srv.volumeControllerFutures[pluginID] = future
			v.srv.volumeControllerLock.Unlock()

			err := fn()

			// close the future while holding the lock and not in a defer so
			// that we can ensure we've cleared it from the map before allowing
			// anyone else to take the lock and write a new one
			v.srv.volumeControllerLock.Lock()
			futureDone()
			delete(v.srv.volumeControllerFutures, pluginID)
			v.srv.volumeControllerLock.Unlock()

			return err
		} else {
			v.srv.volumeControllerLock.Unlock()

			select {
			case <-future.Done():
				continue
			case <-v.srv.shutdownCh:
				// The csi_hook publish workflow on the client will retry if it
				// gets this error. On unpublish, we don't want to block client
				// shutdown so we give up on error. The new leader's
				// volumewatcher will iterate all the claims at startup to
				// detect this and mop up any claims in the NodeDetached state
				// (volume GC will run periodically as well)
				return structs.ErrNoLeader
			}
		}
	}
}

// allowCSIMount is called on Job register to check mount permission
func allowCSIMount(aclObj *acl.ACL, namespace string) bool {
	return aclObj.AllowPluginRead() &&
		aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIMountVolume)
}

// Unpublish synchronously sends the NodeUnpublish, NodeUnstage, and
// ControllerUnpublish RPCs to the client. It handles errors according to the
// current claim state.
func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *structs.CSIVolumeUnpublishResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Unpublish", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}

	defer metrics.MeasureSince([]string{"nomad", "volume", "unpublish"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIMountVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}
	// this RPC is called by both clients and by `nomad volume detach`. we can't
	// safely match the node ID for client RPCs because we may not have the node
	// ID anymore
	if !aclObj.AllowClientOp() &&
		!(allowVolume(aclObj, args.RequestNamespace()) && aclObj.AllowPluginRead()) {
		return structs.ErrPermissionDenied
	}

	if args.VolumeID == "" {
		return fmt.Errorf("missing volume ID")
	}
	if args.Claim == nil {
		return fmt.Errorf("missing volume claim")
	}

	ws := memdb.NewWatchSet()
	state := v.srv.fsm.State()
	vol, err := state.CSIVolumeByID(ws, args.Namespace, args.VolumeID)
	if err != nil {
		return err
	}
	if vol == nil {
		return fmt.Errorf("no such volume")
	}

	claim := args.Claim

	// we need to checkpoint when we first get the claim to ensure we've set the
	// initial "past claim" state, otherwise a client that unpublishes (skipping
	// the node unpublish b/c it's done that work) fail to get written if the
	// controller unpublish fails.
	vol = vol.Copy()
	err = v.checkpointClaim(vol, claim)
	if err != nil {
		return err
	}

	// previous checkpoints may have set the past claim state already.
	// in practice we should never see CSIVolumeClaimStateControllerDetached
	// but having an option for the state makes it easy to add a checkpoint
	// in a backwards compatible way if we need one later
	switch claim.State {
	case structs.CSIVolumeClaimStateNodeDetached:
		goto NODE_DETACHED
	case structs.CSIVolumeClaimStateControllerDetached:
		goto RELEASE_CLAIM
	case structs.CSIVolumeClaimStateReadyToFree:
		goto RELEASE_CLAIM
	}
	vol = vol.Copy()
	err = v.nodeUnpublishVolume(vol, claim)
	if err != nil {
		return err
	}

NODE_DETACHED:
	vol = vol.Copy()
	err = v.controllerUnpublishVolume(vol, claim)
	if err != nil {
		return err
	}

RELEASE_CLAIM:
	v.logger.Trace("releasing claim", "vol", vol.ID)
	// advance a CSIVolumeClaimStateControllerDetached claim
	claim.State = structs.CSIVolumeClaimStateReadyToFree
	err = v.checkpointClaim(vol, claim)
	if err != nil {
		return err
	}

	reply.Index = vol.ModifyIndex
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

// nodeUnpublishVolume handles the sending RPCs to the Node plugin to unmount
// it. Typically this task is already completed on the client, but we need to
// have this here so that GC can re-send it in case of client-side
// problems. This function should only be called on a copy of the volume.
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
	v.logger.Trace("node unpublish", "vol", vol.ID)

	// We need a new snapshot after each checkpoint
	snap, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	// If the node has been GC'd or is down, we can't send it a node
	// unpublish. We need to assume the node has unpublished at its
	// end. If it hasn't, any controller unpublish will potentially
	// hang or error and need to be retried.
	if claim.NodeID != "" {
		node, err := snap.NodeByID(memdb.NewWatchSet(), claim.NodeID)
		if err != nil {
			return err
		}
		if node == nil || node.Status == structs.NodeStatusDown {
			v.logger.Debug("skipping node unpublish for down or GC'd node")
			claim.State = structs.CSIVolumeClaimStateNodeDetached
			return v.checkpointClaim(vol, claim)
		}
	}

	if claim.AllocationID != "" {
		err := v.nodeUnpublishVolumeImpl(vol, claim)
		if err != nil {
			return err
		}
		claim.State = structs.CSIVolumeClaimStateNodeDetached
		return v.checkpointClaim(vol, claim)
	}

	// The RPC sent from the 'nomad node detach' command or GC won't have an
	// allocation ID set so we try to unpublish every terminal or invalid
	// alloc on the node, all of which will be in PastClaims after denormalizing
	vol, err = snap.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
	if err != nil {
		return err
	}

	claimsToUnpublish := []*structs.CSIVolumeClaim{}
	for _, pastClaim := range vol.PastClaims {
		if claim.NodeID == pastClaim.NodeID {
			claimsToUnpublish = append(claimsToUnpublish, pastClaim)
		}
	}

	var merr multierror.Error
	for _, pastClaim := range claimsToUnpublish {
		err := v.nodeUnpublishVolumeImpl(vol, pastClaim)
		if err != nil {
			merr.Errors = append(merr.Errors, err)
		}
	}
	err = merr.ErrorOrNil()
	if err != nil {
		return err
	}

	claim.State = structs.CSIVolumeClaimStateNodeDetached
	return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
	if claim.AccessMode == structs.CSIVolumeAccessModeUnknown {
		// claim has already been released client-side
		return nil
	}

	req := &cstructs.ClientCSINodeDetachVolumeRequest{
		PluginID:        vol.PluginID,
		VolumeID:        vol.ID,
		VolumeNamespace: vol.Namespace,
		ExternalID:      vol.RemoteID(),
		AllocID:         claim.AllocationID,
		NodeID:          claim.NodeID,
		AttachmentMode:  claim.AttachmentMode,
		AccessMode:      claim.AccessMode,
		ReadOnly:        claim.Mode == structs.CSIVolumeClaimRead,
	}
	err := v.srv.RPC("ClientCSI.NodeDetachVolume",
		req, &cstructs.ClientCSINodeDetachVolumeResponse{})
	if err != nil {
		// we should only get this error if the Nomad node disconnects and
		// is garbage-collected, so at this point we don't have any reason
		// to operate as though the volume is attached to it.
		// note: errors.Is cannot be used because the RPC call breaks
		// error wrapping.
		if !strings.Contains(err.Error(), structs.ErrUnknownNode.Error()) {
			return fmt.Errorf("could not detach from node: %w", err)
		}
	}
	return nil
}

// controllerUnpublishVolume handles the sending RPCs to the Controller plugin
// to unpublish the volume (detach it from its host). This function should only
// be called on a copy of the volume.
func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
	v.logger.Trace("controller unpublish", "vol", vol.ID)

	if !vol.ControllerRequired {
		claim.State = structs.CSIVolumeClaimStateReadyToFree
		return nil
	}

	// We need a new snapshot after each checkpoint
	snap, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	ws := memdb.NewWatchSet()

	plugin, err := snap.CSIPluginByID(ws, vol.PluginID)
	if err != nil {
		return fmt.Errorf("could not query plugin: %v", err)
	} else if plugin == nil {
		return fmt.Errorf("no such plugin: %q", vol.PluginID)
	}

	if !plugin.HasControllerCapability(structs.CSIControllerSupportsAttachDetach) {
		claim.State = structs.CSIVolumeClaimStateReadyToFree
		return nil
	}

	vol, err = snap.CSIVolumeDenormalize(ws, vol)
	if err != nil {
		return err
	}

	// we only send a controller detach if a Nomad client no longer has any
	// claim to the volume, so we need to check the status of any other claimed
	// allocations
	shouldCancel := func(alloc *structs.Allocation) bool {
		if alloc != nil && alloc.ID != claim.AllocationID &&
			alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
			claim.State = structs.CSIVolumeClaimStateReadyToFree
			v.logger.Debug(
				"controller unpublish canceled: another non-terminal alloc is on this node",
				"vol", vol.ID, "alloc", alloc.ID)
			return true
		}
		return false
	}

	for _, alloc := range vol.ReadAllocs {
		if shouldCancel(alloc) {
			return nil
		}
	}
	for _, alloc := range vol.WriteAllocs {
		if shouldCancel(alloc) {
			return nil
		}
	}

	// if the RPC is sent by a client node, it may not know the claim's
	// external node ID.
	if claim.ExternalNodeID == "" {
		externalNodeID, err := v.lookupExternalNodeID(vol, claim)
		if err != nil {
			// if the node has been GC'd, there's no path for us to ever send
			// the controller detach, so assume the node is gone
			if errors.Is(err, structs.ErrUnknownNode) {
				v.logger.Trace("controller detach skipped for missing node", "vol", vol.ID)
				claim.State = structs.CSIVolumeClaimStateReadyToFree
				return v.checkpointClaim(vol, claim)
			}

			return fmt.Errorf("missing external node ID: %v", err)
		}
		claim.ExternalNodeID = externalNodeID
	}

	req := &cstructs.ClientCSIControllerDetachVolumeRequest{
		VolumeID:        vol.RemoteID(),
		ClientCSINodeID: claim.ExternalNodeID,
		Secrets:         vol.Secrets,
	}
	req.PluginID = vol.PluginID

	err = v.serializedControllerRPC(vol.PluginID, func() error {
		return v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
			&cstructs.ClientCSIControllerDetachVolumeResponse{})
	})
	if err != nil {
		return fmt.Errorf("could not detach from controller: %v", err)
	}

	v.logger.Trace("controller detach complete", "vol", vol.ID)
	claim.State = structs.CSIVolumeClaimStateReadyToFree
	return v.checkpointClaim(vol, claim)
}

// lookupExternalNodeID gets the CSI plugin's ID for a node.  we look it up in
// the volume's claims first because it's possible the client has been stopped
// and GC'd by this point, so looking there is the last resort.
func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) {
	for _, rClaim := range vol.ReadClaims {
		if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" {
			return rClaim.ExternalNodeID, nil
		}
	}
	for _, wClaim := range vol.WriteClaims {
		if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" {
			return wClaim.ExternalNodeID, nil
		}
	}
	for _, pClaim := range vol.PastClaims {
		if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" {
			return pClaim.ExternalNodeID, nil
		}
	}

	// fallback to looking up the node plugin
	ws := memdb.NewWatchSet()
	state := v.srv.fsm.State()
	targetNode, err := state.NodeByID(ws, claim.NodeID)
	if err != nil {
		return "", err
	}
	if targetNode == nil {
		return "", fmt.Errorf("%w %s", structs.ErrUnknownNode, claim.NodeID)
	}

	// get the storage provider's ID for the client node (not
	// Nomad's ID for the node)
	targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID]
	if !ok || targetCSIInfo.NodeInfo == nil {
		return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID)
	}
	return targetCSIInfo.NodeInfo.ID, nil
}

func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
	v.logger.Trace("checkpointing claim")
	req := structs.CSIVolumeClaimRequest{
		VolumeID:     vol.ID,
		AllocationID: claim.AllocationID,
		NodeID:       claim.NodeID,
		Claim:        claim.Mode,
		State:        claim.State,
		WriteRequest: structs.WriteRequest{
			Namespace: vol.Namespace,
		},
	}
	_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
	if err != nil {
		v.logger.Error("csi raft apply failed", "error", err)
		return err
	}
	vol.ModifyIndex = index
	return nil
}

func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.CSIVolumeCreateResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Create", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "create"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	// permission for the volume namespaces will be checked below
	if !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	if len(args.Volumes) == 0 {
		return fmt.Errorf("missing volume definition")
	}

	regArgs := &structs.CSIVolumeRegisterRequest{WriteRequest: args.WriteRequest}

	type validated struct {
		vol    *structs.CSIVolume
		plugin *structs.CSIPlugin
		// if the volume already exists, we'll update it instead of creating.
		current *structs.CSIVolume
	}
	validatedVols := []validated{}

	snap, err := v.srv.State().Snapshot()
	if err != nil {
		return err
	}

	// Validate ACLs, that the plugin exists for each volume, and validate the
	// capabilities when the plugin has a controller.
	for _, vol := range args.Volumes {
		if vol.Namespace == "" {
			vol.Namespace = args.RequestNamespace()
		}
		if !allowVolume(aclObj, vol.Namespace) {
			return structs.ErrPermissionDenied
		}
		if err = vol.Validate(); err != nil {
			return err
		}
		plugin, err := v.pluginValidateVolume(vol)
		if err != nil {
			return err
		}
		if !plugin.ControllerRequired {
			return fmt.Errorf("plugin has no controller")
		}
		if !plugin.HasControllerCapability(structs.CSIControllerSupportsCreateDelete) {
			return fmt.Errorf("plugin does not support creating volumes")
		}

		// if the volume already exists, we'll update it instead
		// current will be nil if it does not exist.
		current, err := snap.CSIVolumeByID(nil, vol.Namespace, vol.ID)
		if err != nil {
			return err
		}

		validatedVols = append(validatedVols,
			validated{vol, plugin, current})
	}

	// Attempt to create all the validated volumes and write only successfully
	// created volumes to raft. And we'll report errors for any failed volumes
	//
	// NOTE: creating the volume in the external storage provider can't be
	// made atomic with the registration, and creating the volume provides
	// values we want to write on the CSIVolume in raft anyways. For now
	// we'll block the RPC on the external storage provider so that we can
	// easily return meaningful errors to the user, but in the future we
	// should consider creating registering first and creating a "volume
	// eval" that can do the plugin RPCs async.

	var mErr multierror.Error
	var index uint64

	for _, valid := range validatedVols {
		if valid.current != nil {
			// reconcile mutable fields
			cp := valid.current.Copy()
			err = v.reconcileVolume(valid.plugin, cp, valid.vol)
			if err != nil {
				mErr.Errors = append(mErr.Errors, err)
			} else {
				// we merged valid.vol into cp, so update state with the copy
				regArgs.Volumes = append(regArgs.Volumes, cp)
			}

		} else {
			err = v.createVolume(valid.vol, valid.plugin)
			if err != nil {
				mErr.Errors = append(mErr.Errors, err)
			} else {
				regArgs.Volumes = append(regArgs.Volumes, valid.vol)
			}
		}
	}

	// If we created or updated volumes, apply them to raft.
	if len(regArgs.Volumes) > 0 {
		_, index, err = v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
		if err != nil {
			v.logger.Error("csi raft apply failed", "error", err, "method", "register")
			mErr.Errors = append(mErr.Errors, err)
		}
	}

	err = mErr.ErrorOrNil()
	if err != nil {
		return err
	}

	reply.Volumes = regArgs.Volumes
	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin) error {

	method := "ClientCSI.ControllerCreateVolume"
	cReq := &cstructs.ClientCSIControllerCreateVolumeRequest{
		Name:                vol.Name,
		VolumeCapabilities:  vol.RequestedCapabilities,
		MountOptions:        vol.MountOptions,
		Parameters:          vol.Parameters,
		Secrets:             vol.Secrets,
		CapacityMin:         vol.RequestedCapacityMin,
		CapacityMax:         vol.RequestedCapacityMax,
		SnapshotID:          vol.SnapshotID,
		CloneID:             vol.CloneID,
		RequestedTopologies: vol.RequestedTopologies,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{}
	err := v.srv.RPC(method, cReq, cResp)
	if err != nil {
		return err
	}

	vol.ExternalID = cResp.ExternalVolumeID
	vol.Capacity = cResp.CapacityBytes
	vol.Context = cResp.VolumeContext
	vol.Topologies = cResp.Topologies
	return nil
}

// expandVolume validates the requested capacity values and issues
// ControllerExpandVolume (and NodeExpandVolume, if needed) to the CSI plugin,
// via Nomad client RPC.
//
// Note that capacity can only be increased; reduction in size is not possible,
// and if the volume is already at the desired capacity, no action is taken.
// vol Capacity-related values are mutated if successful, so callers should
// pass in a copy, then commit changes to raft.
func (v *CSIVolume) expandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, capacity *csi.CapacityRange) error {
	if vol == nil || plugin == nil || capacity == nil {
		return errors.New("unexpected nil value")
	}

	newMax := capacity.LimitBytes
	newMin := capacity.RequiredBytes
	logger := v.logger.Named("expandVolume").With(
		"vol", vol.ID,
		"requested_min", humanize.Bytes(uint64(newMin)),
		"requested_max", humanize.Bytes(uint64(newMax)),
	)

	// If requested capacity values are unset, skip everything.
	if newMax == 0 && newMin == 0 {
		logger.Debug("min and max values are zero")
		return nil
	}

	// New values same as current, so nothing to do.
	if vol.RequestedCapacityMax == newMax &&
		vol.RequestedCapacityMin == newMin {
		logger.Debug("requested capacity unchanged")
		return nil
	}

	// If max is specified, it cannot be less than min or current capacity.
	if newMax > 0 {
		if newMax < newMin {
			return fmt.Errorf("max requested capacity (%s) less than or equal to min (%s)",
				humanize.Bytes(uint64(newMax)),
				humanize.Bytes(uint64(newMin)))
		}
		if newMax < vol.Capacity {
			return fmt.Errorf("max requested capacity (%s) less than or equal to current (%s)",
				humanize.Bytes(uint64(newMax)),
				humanize.Bytes(uint64(vol.Capacity)))
		}
	}

	// Values are validated, so go ahead and update vol to commit to state,
	// even if the external volume does not need expanding.
	vol.RequestedCapacityMin = newMin
	vol.RequestedCapacityMax = newMax

	// Only expand if new min is greater than current capacity.
	if newMin <= vol.Capacity {
		return nil
	}

	if !plugin.HasControllerCapability(structs.CSIControllerSupportsExpand) {
		return errors.New("expand is not implemented by this controller plugin")
	}

	capability, err := csi.VolumeCapabilityFromStructs(vol.AttachmentMode, vol.AccessMode, vol.MountOptions)
	if err != nil {
		logger.Debug("unable to get capability from volume", "error", err)
		// We'll optimistically send a nil capability, as an "unknown"
		// attachment mode (likely not attached) is acceptable per the spec.
	}

	method := "ClientCSI.ControllerExpandVolume"
	cReq := &cstructs.ClientCSIControllerExpandVolumeRequest{
		ExternalVolumeID: vol.ExternalID,
		Secrets:          vol.Secrets,
		CapacityRange:    capacity,
		VolumeCapability: capability,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerExpandVolumeResponse{}

	logger.Info("starting volume expansion")
	// This is the real work. The client RPC sends a gRPC to the controller plugin,
	// then that controller may reach out to cloud APIs, etc.
	err = v.serializedControllerRPC(plugin.ID, func() error {
		return v.srv.RPC(method, cReq, cResp)
	})
	if err != nil {
		return fmt.Errorf("unable to expand volume: %w", err)
	}
	vol.Capacity = cResp.CapacityBytes
	logger.Info("controller done expanding volume")

	if cResp.NodeExpansionRequired {
		return v.nodeExpandVolume(vol, plugin, capacity)
	}

	return nil
}

// nodeExpandVolume sends NodeExpandVolume requests to the appropriate client
// for each allocation that has a claim on the volume. The client will then
// send a gRPC call to the CSI node plugin colocated with the allocation.
func (v *CSIVolume) nodeExpandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, capacity *csi.CapacityRange) error {
	var mErr multierror.Error
	logger := v.logger.Named("nodeExpandVolume").
		With("volume", vol.ID, "plugin", plugin.ID)

	expand := func(claim *structs.CSIVolumeClaim) {
		if claim == nil {
			return
		}

		logger.Debug("starting volume expansion on node",
			"node_id", claim.NodeID, "alloc_id", claim.AllocationID)

		resp := &cstructs.ClientCSINodeExpandVolumeResponse{}
		req := &cstructs.ClientCSINodeExpandVolumeRequest{
			PluginID:        plugin.ID,
			VolumeID:        vol.ID,
			VolumeNamespace: vol.Namespace,
			ExternalID:      vol.ExternalID,
			Capacity:        capacity,
			Claim:           claim,
		}
		if err := v.srv.RPC("ClientCSI.NodeExpandVolume", req, resp); err != nil {
			mErr.Errors = append(mErr.Errors, err)
		}

		if resp.CapacityBytes != vol.Capacity {
			// not necessarily an error, but maybe notable
			logger.Warn("unexpected capacity from NodeExpandVolume",
				"expected", vol.Capacity, "resp", resp.CapacityBytes)
		}
	}

	for _, claim := range vol.ReadClaims {
		expand(claim)
	}
	for _, claim := range vol.WriteClaims {
		expand(claim)
	}

	return mErr.ErrorOrNil()
}

func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.CSIVolumeDeleteResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.Delete", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "delete"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	if len(args.VolumeIDs) == 0 {
		return fmt.Errorf("missing volume IDs")
	}

	for _, volID := range args.VolumeIDs {

		plugin, vol, err := v.volAndPluginLookup(args.Namespace, volID)
		if err != nil {
			if err == fmt.Errorf("volume not found: %s", volID) {
				v.logger.Warn("volume %q to be deleted was already deregistered")
				continue
			} else {
				return err
			}
		}
		if plugin == nil {
			return fmt.Errorf("plugin %q for volume %q not found", vol.PluginID, volID)
		}

		// NOTE: deleting the volume in the external storage provider can't be
		// made atomic with deregistration. We can't delete a volume that's
		// not registered because we need to be able to lookup its plugin.
		err = v.deleteVolume(vol, plugin, args.Secrets)
		if err != nil {
			return err
		}
	}

	deregArgs := &structs.CSIVolumeDeregisterRequest{
		VolumeIDs:    args.VolumeIDs,
		WriteRequest: args.WriteRequest,
	}
	_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
	if err != nil {
		v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
		return err
	}

	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}

func (v *CSIVolume) deleteVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, querySecrets structs.CSISecrets) error {
	// Combine volume and query secrets into one map.
	// Query secrets override any secrets stored with the volume.
	combinedSecrets := vol.Secrets
	for k, v := range querySecrets {
		combinedSecrets[k] = v
	}

	method := "ClientCSI.ControllerDeleteVolume"
	cReq := &cstructs.ClientCSIControllerDeleteVolumeRequest{
		ExternalVolumeID: vol.ExternalID,
		Secrets:          combinedSecrets,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerDeleteVolumeResponse{}

	return v.serializedControllerRPC(plugin.ID, func() error {
		return v.srv.RPC(method, cReq, cResp)
	})
}

func (v *CSIVolume) ListExternal(args *structs.CSIVolumeExternalListRequest, reply *structs.CSIVolumeExternalListResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.ListExternal", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricList, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "list_external"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
		acl.NamespaceCapabilityCSIReadVolume,
		acl.NamespaceCapabilityCSIMountVolume,
		acl.NamespaceCapabilityListJobs)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	// NOTE: this is the plugin's namespace, not the volume(s) because they
	// might not even be registered
	if !allowVolume(aclObj, args.RequestNamespace()) {
		return structs.ErrPermissionDenied
	}
	snap, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	plugin, err := snap.CSIPluginByID(nil, args.PluginID)
	if err != nil {
		return err
	}
	if plugin == nil {
		return fmt.Errorf("no such plugin")
	}
	if !plugin.HasControllerCapability(structs.CSIControllerSupportsListVolumes) {
		return fmt.Errorf("unimplemented for this plugin")
	}

	method := "ClientCSI.ControllerListVolumes"
	cReq := &cstructs.ClientCSIControllerListVolumesRequest{
		MaxEntries:    args.PerPage,
		StartingToken: args.NextToken,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerListVolumesResponse{}

	err = v.srv.RPC(method, cReq, cResp)
	if err != nil {
		return err
	}
	if args.PerPage > 0 && args.PerPage < int32(len(cResp.Entries)) {
		// this should be done in the plugin already, but enforce it
		reply.Volumes = cResp.Entries[:args.PerPage]
	} else {
		reply.Volumes = cResp.Entries
	}
	reply.NextToken = cResp.NextToken

	return nil
}

func (v *CSIVolume) CreateSnapshot(args *structs.CSISnapshotCreateRequest, reply *structs.CSISnapshotCreateResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.CreateSnapshot", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "create_snapshot"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}
	if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	state, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	method := "ClientCSI.ControllerCreateSnapshot"
	var mErr multierror.Error
	for _, snap := range args.Snapshots {
		if snap == nil {
			// we intentionally don't multierror here because we're in a weird state
			return fmt.Errorf("snapshot cannot be nil")
		}

		vol, err := state.CSIVolumeByID(nil, args.RequestNamespace(), snap.SourceVolumeID)
		if err != nil {
			multierror.Append(&mErr, fmt.Errorf("error querying volume %q: %v", snap.SourceVolumeID, err))
			continue
		}
		if vol == nil {
			multierror.Append(&mErr, fmt.Errorf("no such volume %q", snap.SourceVolumeID))
			continue
		}

		pluginID := snap.PluginID
		if pluginID == "" {
			pluginID = vol.PluginID
		}

		plugin, err := state.CSIPluginByID(nil, pluginID)
		if err != nil {
			multierror.Append(&mErr,
				fmt.Errorf("error querying plugin %q: %v", pluginID, err))
			continue
		}
		if plugin == nil {
			multierror.Append(&mErr, fmt.Errorf("no such plugin %q", pluginID))
			continue
		}
		if !plugin.HasControllerCapability(structs.CSIControllerSupportsCreateDeleteSnapshot) {
			multierror.Append(&mErr,
				fmt.Errorf("plugin %q does not support snapshot", pluginID))
			continue
		}

		secrets := vol.Secrets
		for k, v := range snap.Secrets {
			// merge request secrets onto volume secrets
			secrets[k] = v
		}

		cReq := &cstructs.ClientCSIControllerCreateSnapshotRequest{
			ExternalSourceVolumeID: vol.ExternalID,
			Name:                   snap.Name,
			Secrets:                secrets,
			Parameters:             snap.Parameters,
		}
		cReq.PluginID = pluginID
		cResp := &cstructs.ClientCSIControllerCreateSnapshotResponse{}
		err = v.serializedControllerRPC(pluginID, func() error {
			return v.srv.RPC(method, cReq, cResp)
		})
		if err != nil {
			multierror.Append(&mErr, fmt.Errorf("could not create snapshot: %v", err))
			continue
		}
		reply.Snapshots = append(reply.Snapshots, &structs.CSISnapshot{
			ID:                     cResp.ID,
			ExternalSourceVolumeID: cResp.ExternalSourceVolumeID,
			SizeBytes:              cResp.SizeBytes,
			CreateTime:             cResp.CreateTime,
			IsReady:                cResp.IsReady,
		})
	}

	return mErr.ErrorOrNil()
}

func (v *CSIVolume) DeleteSnapshot(args *structs.CSISnapshotDeleteRequest, reply *structs.CSISnapshotDeleteResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.DeleteSnapshot", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "delete_snapshot"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	// NOTE: this is the plugin's namespace, not the snapshot(s) because we
	// don't track snapshots in the state store at all and their source
	// volume(s) because they might not even be registered
	if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	stateSnap, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	var mErr multierror.Error
	for _, snap := range args.Snapshots {
		if snap == nil {
			// we intentionally don't multierror here because we're in a weird state
			return fmt.Errorf("snapshot cannot be nil")
		}

		plugin, err := stateSnap.CSIPluginByID(nil, snap.PluginID)
		if err != nil {
			multierror.Append(&mErr,
				fmt.Errorf("could not query plugin %q: %v", snap.PluginID, err))
			continue
		}
		if plugin == nil {
			multierror.Append(&mErr, fmt.Errorf("no such plugin"))
			continue
		}
		if !plugin.HasControllerCapability(structs.CSIControllerSupportsCreateDeleteSnapshot) {
			multierror.Append(&mErr, fmt.Errorf("plugin does not support snapshot"))
			continue
		}

		method := "ClientCSI.ControllerDeleteSnapshot"

		cReq := &cstructs.ClientCSIControllerDeleteSnapshotRequest{
			ID:      snap.ID,
			Secrets: snap.Secrets,
		}
		cReq.PluginID = plugin.ID
		cResp := &cstructs.ClientCSIControllerDeleteSnapshotResponse{}
		err = v.serializedControllerRPC(plugin.ID, func() error {
			return v.srv.RPC(method, cReq, cResp)
		})
		if err != nil {
			multierror.Append(&mErr, fmt.Errorf("could not delete %q: %v", snap.ID, err))
		}
	}
	return mErr.ErrorOrNil()
}

func (v *CSIVolume) ListSnapshots(args *structs.CSISnapshotListRequest, reply *structs.CSISnapshotListResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIVolume.ListSnapshots", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_volume", structs.RateMetricList, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "volume", "list_snapshots"}, time.Now())

	allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
		acl.NamespaceCapabilityCSIReadVolume,
		acl.NamespaceCapabilityCSIMountVolume,
		acl.NamespaceCapabilityListJobs)
	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}

	// NOTE: this is the plugin's namespace, not the volume(s) because they
	// might not even be registered
	if !allowVolume(aclObj, args.RequestNamespace()) {
		return structs.ErrPermissionDenied
	}
	snap, err := v.srv.fsm.State().Snapshot()
	if err != nil {
		return err
	}

	plugin, err := snap.CSIPluginByID(nil, args.PluginID)
	if err != nil {
		return err
	}
	if plugin == nil {
		return fmt.Errorf("no such plugin")
	}
	if !plugin.HasControllerCapability(structs.CSIControllerSupportsListSnapshots) {
		return fmt.Errorf("plugin does not support listing snapshots")
	}

	method := "ClientCSI.ControllerListSnapshots"
	cReq := &cstructs.ClientCSIControllerListSnapshotsRequest{
		MaxEntries:    args.PerPage,
		StartingToken: args.NextToken,
		Secrets:       args.Secrets,
	}
	cReq.PluginID = plugin.ID
	cResp := &cstructs.ClientCSIControllerListSnapshotsResponse{}

	err = v.srv.RPC(method, cReq, cResp)
	if err != nil {
		return err
	}
	if args.PerPage > 0 && args.PerPage < int32(len(cResp.Entries)) {
		// this should be done in the plugin already, but enforce it
		reply.Snapshots = cResp.Entries[:args.PerPage]
	} else {
		reply.Snapshots = cResp.Entries
	}
	reply.NextToken = cResp.NextToken

	return nil
}

// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
	srv    *Server
	ctx    *RPCContext
	logger hclog.Logger
}

func NewCSIPluginEndpoint(srv *Server, ctx *RPCContext) *CSIPlugin {
	return &CSIPlugin{srv: srv, ctx: ctx, logger: srv.logger.Named("csi_plugin")}
}

// List replies with CSIPlugins, filtered by ACL access
func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIPluginListResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIPlugin.List", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_plugin", structs.RateMetricList, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, time.Now())

	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}
	if !aclObj.AllowPluginList() {
		return structs.ErrPermissionDenied
	}

	opts := blockingOptions{
		queryOpts: &args.QueryOptions,
		queryMeta: &reply.QueryMeta,
		run: func(ws memdb.WatchSet, state *state.StateStore) error {

			var iter memdb.ResultIterator
			var err error
			if args.Prefix != "" {
				iter, err = state.CSIPluginsByIDPrefix(ws, args.Prefix)
				if err != nil {
					return err
				}
			} else {
				// Query all plugins
				iter, err = state.CSIPlugins(ws)
				if err != nil {
					return err
				}
			}

			// Collect results
			ps := []*structs.CSIPluginListStub{}
			for {
				raw := iter.Next()
				if raw == nil {
					break
				}

				plug := raw.(*structs.CSIPlugin)
				ps = append(ps, plug.Stub())
			}

			reply.Plugins = ps
			return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
		}}
	return v.srv.blockingRPC(&opts)
}

// Get fetches detailed information about a specific plugin
func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPluginGetResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIPlugin.Get", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_plugin", structs.RateMetricRead, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, time.Now())

	aclObj, err := v.srv.ResolveACL(args)
	if err != nil {
		return err
	}
	if !aclObj.AllowPluginRead() {
		return structs.ErrPermissionDenied
	}

	ns := args.RequestNamespace()
	withAllocs := aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)

	if args.ID == "" {
		return fmt.Errorf("missing plugin ID")
	}

	opts := blockingOptions{
		queryOpts: &args.QueryOptions,
		queryMeta: &reply.QueryMeta,
		run: func(ws memdb.WatchSet, state *state.StateStore) error {
			snap, err := state.Snapshot()
			if err != nil {
				return err
			}

			plug, err := snap.CSIPluginByID(ws, args.ID)
			if err != nil {
				return err
			}

			if plug == nil {
				return nil
			}

			// if we're not allowed access to the namespace at all, we skip this
			// copy as an optimization. withAllocs will be true for the wildcard
			// namespace
			if withAllocs {
				plug, err = snap.CSIPluginDenormalize(ws, plug.Copy())
				if err != nil {
					return err
				}

				// Filter the allocation stubs by allowed namespace
				var as []*structs.AllocListStub
				for _, a := range plug.Allocations {
					if ns == structs.AllNamespacesSentinel || a.Namespace == ns {
						if aclObj.AllowNsOp(a.Namespace, acl.NamespaceCapabilityReadJob) {
							as = append(as, a)
						}
					}
				}
				plug.Allocations = as
			}

			reply.Plugin = plug
			return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
		}}
	return v.srv.blockingRPC(&opts)
}

// Delete deletes a plugin if it is unused
func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.CSIPluginDeleteResponse) error {

	authErr := v.srv.Authenticate(v.ctx, args)
	if done, err := v.srv.forward("CSIPlugin.Delete", args, args, reply); done {
		return err
	}
	v.srv.MeasureRPCRate("csi_plugin", structs.RateMetricWrite, args)
	if authErr != nil {
		return structs.ErrPermissionDenied
	}
	defer metrics.MeasureSince([]string{"nomad", "plugin", "delete"}, time.Now())

	// Check that it is a management token.
	if aclObj, err := v.srv.ResolveACL(args); err != nil {
		return err
	} else if !aclObj.IsManagement() {
		return structs.ErrPermissionDenied
	}

	if args.ID == "" {
		return fmt.Errorf("missing plugin ID")
	}

	_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
	if err != nil && !errors.Is(err, structs.ErrCSIPluginInUse) {
		v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
		return err
	}

	reply.Index = index
	v.srv.setQueryMeta(&reply.QueryMeta)
	return nil
}
